當我們要確保資料是否有成功insert,除了使用程式邏輯上的Lock控制還會搭配到使用事務執行流程去管控。
透過以下應用舉例,達到控制寫入資料&事務流程設計。 過程註解寫在代碼中
Golang
假設有5筆資料要做寫入,流程上如何達到(寫語法)與(執行語法)錯開的狀況,並透過事務流程完成提交。
組語法部分 : 每5s取一筆數據去組insert into .... values (...)的values資料。
執行寫入DB部分: 每10s觸發一次提交數據。
先看程式執行完成LOG:
昨天先建好的資料:
mysql> select * from user_lists;
+---------+---------+-------+-----------------+-------------+
| user_id | account | level | last_login_time | create_time |
+---------+---------+-------+-----------------+-------------+
| 100001 | siang05 | 1 | 1630385990 | 1330385990 |
| 100002 | siang01 | 1 | 1630385990 | 1330385990 |
| 100003 | siang02 | 1 | 1630385990 | 1330385990 |
| 100004 | siang03 | 1 | 1630385990 | 1330385990 |
| 100005 | siang04 | 1 | 1630385990 | 1330385990 |
+---------+---------+-------+-----------------+-------------+
5 rows in set (0.00 sec)
package main
import (
"database/sql"
"fmt"
"sync"
"time"
_ "github.com/go-sql-driver/mysql"
)
func main() {
//Init connect object. content-> account:password@tcp(host_ip:port)/db_name
dbu, err := sql.Open("mysql", "root:1234@tcp(127.0.0.1:3306)/user?charset=utf8&parseTime=True")
if err != nil {
fmt.Println("open mysql error", err)
return
}
//close conn.
defer dbu.Close()
//Create mysql connect.
err = dbu.Ping()
if err != nil {
fmt.Println("create mysql connect error", err)
return
}
/*
>以下連接池設定
注意點:
(1.)mysql連線預設保存時間為8小時,閒置超過8小時會被mysql斷開變失效連線。
查詢: show variables like '%wait_timeout%';
Q: 使用到被mysql斷開的連線會產生ERROR -> packets.go:36: unexpected EOF
(2.)mysql最大連線數。
查詢: show variables like '%max_connections%';
*/
//設置最大併發連線數,超過連線數需等待,直到其中連接被釋放並變為空閒。
dbu.SetMaxOpenConns(100)
//設置最大的空閒連接數,適當的設定空閒連接數(會佔用內存)將提高性能,減少從頭建立新連接的可能。
dbu.SetMaxIdleConns(10)
//設置連線的生命週期,過期後無法重用。
dbu.SetConnMaxLifetime(30)
/*
以下內容單純舉例大致流程
重點: 資料寫入的流程 or 如果寫入發生中斷造成失敗要怎麼處理後續步驟~ 在於個人設計了...舉例上沒有多做這部分邏輯處理。
流程上模擬: 今天有5筆資料要做寫入,流程上如何達到(寫語法)與(執行語法)錯開的狀況,並透過事務流程完成提交。
在整個流程上 => 組語法部分 : 每5s取一筆數據組insert into .... values (...) 的values資料。
執行寫入DB部分: 每10s觸發一次提交數據。
*/
//假設有5筆資料要做寫入 (寫入資料來源)
chanBuf := make(chan string, 5)
chanBuf <- fmt.Sprintf("(%d,'%s',%d,%d,%d),", 100002, "siang01", 1, 1630385990, 1330385990)
chanBuf <- fmt.Sprintf("(%d,'%s',%d,%d,%d),", 100003, "siang02", 1, 1630385990, 1330385990)
chanBuf <- fmt.Sprintf("(%d,'%s',%d,%d,%d),", 100004, "siang03", 1, 1630385990, 1330385990)
chanBuf <- fmt.Sprintf("(%d,'%s',%d,%d,%d),", 100005, "siang04", 1, 1630385990, 1330385990)
chanBuf <- fmt.Sprintf("(%d,'%s',%d,%d,%d),", 100001, "siang05", 1, 1630385990, 1330385990)
fmt.Printf("目前等待寫入筆數: %d\n", len(chanBuf))
go GetSqlData(chanBuf)
go ExecTransaction(dbu)
//阻塞主線程防提早執行完,確認5筆資料都完成才退出。
for {
if checker == 5 {
fmt.Printf("All finish")
break
}
}
}
var locks sync.Mutex
var values string //組語法值的部分
var checker int64 //檢查筆數
//GetSqlData 組合寫入資料內容
func GetSqlData(data chan string) {
for {
//為了能看出流程怎麼走才加的
time.Sleep(5 * time.Second)
//讀取data
info := <-data
//防止定時執行寫入機制觸發時還在組寫入資料
locks.Lock()
values += info
locks.Unlock()
}
}
//RollbackTX 回滾事務釋放連線
func RollbackTX(tx *sql.Tx) error {
err := tx.Rollback()
if err != sql.ErrTxDone && err != nil {
fmt.Println("[ERROR] tx rollback error", err)
return err
}
return nil
}
//ExecTransaction 執行事務提交流程
func ExecTransaction(dbu *sql.DB) {
//INSERT組資料的來源看個人設計, 這邊locks機制是為了確保組合SQL的部分不會再有資料寫入
insert := "INSERT INTO user_lists(user_id,account,level,last_login_time,create_time) values"
//設置計時器每10秒觸發一次寫入流程
ticker := time.NewTicker(10 * time.Second)
for range ticker.C {
if values != "" {
//處理語法字串的最後一個逗點變成結束符號(, -> ;)
if values[len(values)-1:] == "," {
values = values[0:len(values)-1] + ";"
}
//完整寫入語法
sql := insert + values
fmt.Println("[READY] 本次執行語法: ", sql)
fmt.Println("[START] 進入Transaction 流程")
//lock -> 禁止values繼續寫入新資料
locks.Lock()
//進入交易模式
fmt.Println("[START] 進入Begin 流程")
tx, beginErr := dbu.Begin()
if beginErr != nil {
if errs := RollbackTX(tx); errs != nil {
fmt.Println("[ERROR] Begin rollback error: ", errs)
}
fmt.Println("[ERROR] Begin error: ", beginErr)
values = ""
locks.Unlock()
continue
}
//執行insert動作
fmt.Println("[START] 進入Exec SQL 流程")
rows, execErr := tx.Exec(sql)
if execErr != nil {
if errs := RollbackTX(tx); errs != nil {
fmt.Println("[ERROR] Exec rollback error: ", errs)
}
fmt.Println("[ERROR] Exec error: ", execErr)
values = ""
locks.Unlock()
continue
}
//提交事務
fmt.Println("[START] 進入Commit 流程")
if CommitErr := tx.Commit(); CommitErr != nil {
if errs := RollbackTX(tx); errs != nil {
fmt.Println("[ERROR] Commit rollback error: ", errs)
}
fmt.Println("[ERROR] Commit error: ", CommitErr)
values = ""
locks.Unlock()
continue
}
//檢查筆數目前已經到幾筆了(返回筆數)
check, _ := rows.RowsAffected()
fmt.Printf("[LOG] 目前完成筆數: %d\n", checker+check)
checker += check
//將寫入成功的數據清除
values = ""
//解鎖-> values可以繼續寫入
locks.Unlock()
fmt.Println("[OK] 完成本次作業")
}
}
}
明天來介紹mysql重要的索引 !!